package com.atguigu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkCDC02_SQL {
    public static void main(String[] args) {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.获取表的执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.创建表并连接Mysql
//        tableEnv.executeSql("create table test(" +
//               "id String primary key," +
//                "region_name String" +
//                ")with(" +
//                "'connector' = 'mysql-cdc'," +
//                "'hostname' = 'hadoop102'," +
//                "'port' = '3306'," +
//                "'username' = 'root'," +
//                "'password' = '000000'," +
//                "'database-name' = 'gmall211227'," +
//                "'table-name' = 'base_region'" +
//                ")");
        tableEnv.executeSql("create table test(" +
//                "id String primary key," +
                //字段名必须要和读的表的字段名字一致
                "tm_id int," +
                "tm_name String" +
                ")with(" +
                "'connector' = 'mysql-cdc'," +
                "'hostname' = 'hadoop102'," +
                "'port' = '3306'," +
                "'username' = 'root'," +
                "'password' = '000000'," +
                "'database-name' = 'gmall211227'," +
                "'table-name' = 'base_trademark'," +
                //设置以下参数为false可以读取没有主键的表
                "'scan.incremental.snapshot.enabled' = 'false'" +
                ")");

        tableEnv.executeSql("select * from test").print();
    }
}
